Debe aegurarse de correr este notebook bajo el ambiente de conda conda_daskpy36.
Asegurese te tener la siguientes versiones de los paquetes.
!conda install scikit-learn=0.23.2 -c conda-forge -n python3 -y
!conda install -n python3 dask-ml=1.6.0 -c conda-forge -y
!conda install cloudpickle=1.6.0 -c conda-forge -y
!conda install s3fs=0.4.0 -c conda-forge -y
!conda install -c plotly plotly -y
!conda install nbformat -y
!conda list
Asegurese de que la versión de todos los paquetes sea la misma en el nodo master, en los trabajadores y en el scheduler.
!conda install msgpack-python=1.0.0 -c conda-forge -y
!conda install numpy=1.19.1 -c conda-forge -y
!conda install toolz=0.10.0 -c conda-forge -y
!conda install tornado=6.0.4 -c conda-forge -y
from dask.distributed import Client
#enable this client for fargate distributed cluster testing
client = Client('Dask-Scheduler.local-dask:8786')
!sudo aws ecs update-service --service Dask-Workers --desired-count 10 --cluster Fargate-Dask-Cluster
{
"service": {
"serviceArn": "arn:aws:ecs:us-east-1:125040470395:service/Fargate-Dask-Cluster/Dask-Workers",
"serviceName": "Dask-Workers",
"clusterArn": "arn:aws:ecs:us-east-1:125040470395:cluster/Fargate-Dask-Cluster",
"loadBalancers": [],
"serviceRegistries": [
{
"registryArn": "arn:aws:servicediscovery:us-east-1:125040470395:service/srv-uwwv2xzuuwcbbzao"
}
],
"status": "ACTIVE",
"desiredCount": 10,
"runningCount": 10,
"pendingCount": 0,
"launchType": "FARGATE",
"platformVersion": "LATEST",
"taskDefinition": "arn:aws:ecs:us-east-1:125040470395:task-definition/cloudformation-dask-workers-v1:1",
"deploymentConfiguration": {
"deploymentCircuitBreaker": {
"enable": false,
"rollback": false
},
"maximumPercent": 200,
"minimumHealthyPercent": 100
},
"deployments": [
{
"id": "ecs-svc/4740220229306881967",
"status": "PRIMARY",
"taskDefinition": "arn:aws:ecs:us-east-1:125040470395:task-definition/cloudformation-dask-workers-v1:1",
"desiredCount": 10,
"pendingCount": 0,
"runningCount": 10,
"failedTasks": 0,
"createdAt": 1632206435.615,
"updatedAt": 1632278988.333,
"launchType": "FARGATE",
"platformVersion": "1.4.0",
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": [
"subnet-0509b076eb2ed7abe"
],
"securityGroups": [
"sg-0401ff0dc3997b809"
],
"assignPublicIp": "ENABLED"
}
},
"rolloutState": "COMPLETED",
"rolloutStateReason": "ECS deployment ecs-svc/4740220229306881967 completed."
}
],
"roleArn": "arn:aws:iam::125040470395:role/aws-service-role/ecs.amazonaws.com/AWSServiceRoleForECS",
"events": [
{
"id": "ba7e4c5c-2f9a-4337-88e8-6506a35a0ad3",
"createdAt": 1632278988.338,
"message": "(service Dask-Workers) has reached a steady state."
},
{
"id": "360b4c26-9a90-4b2a-94cd-950cc506d5d2",
"createdAt": 1632278814.414,
"message": "(service Dask-Workers) has started 10 tasks: (task 1272fce6c53d457b90f4a817c4771cd5) (task 0f05e8a95a63411caa2425b7d18dce86) (task 528183e7071047b691450657832a3f6a) (task 1d5440b6aecd4940b826662a1f464329) (task 11f6cc94ad61474e935aadb64cc3be92) (task 0e86af8c93f04c988661bc9b735cee67) (task 0876c4ae732d4438b64d9ee8a6f276f2) (task 9abe5ed32b654a31a5f804d3d5f34dc8) (task ae0b0d6221b84f39a2fdbced7eaf0300) (task 78a28ed2855b4c8aa42df14811d87ea3)."
},
{
"id": "ccd7d6e0-34f7-49c4-adbc-89532d696f6a",
"createdAt": 1632263088.068,
"message": "(service Dask-Workers) has reached a steady state."
},
{
"id": "2ec08091-e4ec-4346-b09b-9238340348d5",
"createdAt": 1632262931.888,
"message": "(service Dask-Workers) has started 10 tasks: (task b2ea3bff53b740918971b1757b525391) (task f76886b624f146e3892ed47344d35a13) (task 30dee3980a8f44adaec6f9109b2850c8) (task 3f677cc0cc354825bc315153e6360db4) (task 9cf1dd5c140c4e6abbe2cc563720c150) (task ef87e98c0fe94e15a1bdc8b947528cf2) (task a0d2e7158d28422d972f983a07e13ccb) (task 415f37506d8f4ef0828365e3108b76a2) (task c1d5bc35adaa48439f7d96a86ea81533) (task f9c5467ea0e241e88e537ca97a8ea553)."
},
{
"id": "2daf3113-1be3-429e-af86-951f34a56d8b",
"createdAt": 1632248431.073,
"message": "(service Dask-Workers) has reached a steady state."
},
{
"id": "64a3d768-68e2-466e-9a4b-47685736e83f",
"createdAt": 1632226824.442,
"message": "(service Dask-Workers) has reached a steady state."
},
{
"id": "2fef85f1-37e2-4c28-b20f-9bf31a24ceff",
"createdAt": 1632226814.689,
"message": "(service Dask-Workers) has stopped 1 running tasks: (task 285fcc9c4d44495f9b1a6c2885d836b0)."
},
{
"id": "2253bb84-5a26-4447-8a91-d96ba8705614",
"createdAt": 1632226609.674,
"message": "(service Dask-Workers) stopped 9 pending tasks."
},
{
"id": "c0ce8abf-bf53-4b74-9923-fa09026ab205",
"createdAt": 1632226591.617,
"message": "(service Dask-Workers) has started 19 tasks: (task e7b36a7cc2e54a0b93b9c3e85b7b9446) (task 7864ca3dba824068bb472a1fa50749bf) (task 2e3e72a622384542a41c8c4c9164c50d) (task 1bf2bce6bbd64e5ead323f29ebca21a4) (task 66dd546089bf4fd59074eb9a191268b4) (task 9e5f2601f5e141bda479f825391cff5b) (task 5d031b528462465bb470427b06f36f00) (task 479a838399454c5fa3d209ed16384c90) (task fa404b4639d04492a8eccc5ba0c8f8c5) (task 4fbc40d3de9b46ee94cc6394a749be64) (task cafeab40c432494f8cff4ec1a3f9074f) (task ff15d378f78e41c9aaf0a41a24fc7a48) (task a2c9400805fd4f1d8b04b5226f590c47) (task 285fcc9c4d44495f9b1a6c2885d836b0) (task 91d4f7fc12414fa0b1b33010a519084b) (task 829c66b5e1b14fb2abac15116e83c6fd) (task 52663b5641af45079448385093a5a9b8) (task 4bb49dd678d0472f973f138719d291ea) (task 2e15704f16154d53b50bdc43b3374b5e)."
},
{
"id": "9725f0c7-30c1-4718-811e-7d31838078b5",
"createdAt": 1632206589.245,
"message": "(service Dask-Workers) has reached a steady state."
},
{
"id": "26875331-521e-424b-8234-4c357c8fd8c6",
"createdAt": 1632206589.244,
"message": "(service Dask-Workers) (deployment ecs-svc/4740220229306881967) deployment completed."
},
{
"id": "57c5bad9-9880-429c-9058-56476f50a726",
"createdAt": 1632206443.514,
"message": "(service Dask-Workers) has started 1 tasks: (task 479ce42b89f34ffca235a09ea6aac820)."
}
],
"createdAt": 1632206435.615,
"placementConstraints": [],
"placementStrategy": [],
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": [
"subnet-0509b076eb2ed7abe"
],
"securityGroups": [
"sg-0401ff0dc3997b809"
],
"assignPublicIp": "ENABLED"
}
},
"schedulingStrategy": "REPLICA",
"createdBy": "arn:aws:iam::125040470395:root",
"enableECSManagedTags": false,
"propagateTags": "NONE",
"enableExecuteCommand": false
}
}
client.restart()
Client
|
Cluster
|
dask es un wrapper para procesamiento paralelizado en clusters de pandas.
import s3fs
import dask.dataframe as dd
df_2017 = dd.read_csv(
's3://nyc-tlc/trip data/yellow_tripdata_2017-*.csv',
storage_options={'anon': True},
parse_dates=['tpep_pickup_datetime','tpep_dropoff_datetime',],
dtype={'RatecodeID': 'float64',
'VendorID': 'float64',
'passenger_count': 'float64',
'payment_type': 'float64'}
)
df_2018 = dd.read_csv('s3://nyc-tlc/trip data/yellow_tripdata_2018-*.csv',
storage_options={'anon': True},
parse_dates=['tpep_pickup_datetime','tpep_dropoff_datetime'],
dtype={'RatecodeID': 'float64',
'VendorID': 'float64',
'passenger_count': 'float64',
'payment_type': 'float64'}
)
df_2019 = dd.read_csv('s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv',
storage_options={'anon': True},
parse_dates=['tpep_pickup_datetime','tpep_dropoff_datetime'],
dtype={'RatecodeID': 'float64',
'VendorID': 'float64',
'passenger_count': 'float64',
'payment_type': 'float64'}
)
df_2020 = dd.read_csv('s3://nyc-tlc/trip data/yellow_tripdata_2020-*.csv',
storage_options={'anon': True},
parse_dates=['tpep_pickup_datetime','tpep_dropoff_datetime'],
dtype={'RatecodeID': 'float64',
'VendorID': 'float64',
'passenger_count': 'float64',
'payment_type': 'float64'}
)
Actuan como tablas de lookup para poblar las columnas indexadas.
location_dic = {1: 'Newark Airport', 2: 'Jamaica Bay', 3: 'Allerton/Pelham Gardens', 4: 'Alphabet City', 5: 'Arden Heights', 6: 'Arrochar/Fort Wadsworth', 7: 'Astoria', 8: 'Astoria Park', 9: 'Auburndale', 10: 'Baisley Park', 11: 'Bath Beach', 12: 'Battery Park', 13: 'Battery Park City', 14: 'Bay Ridge', 15: 'Bay Terrace/Fort Totten', 16: 'Bayside', 17: 'Bedford', 18: 'Bedford Park', 19: 'Bellerose', 20: 'Belmont', 21: 'Bensonhurst East', 22: 'Bensonhurst West', 23: 'Bloomfield/Emerson Hill', 24: 'Bloomingdale', 25: 'Boerum Hill', 26: 'Borough Park', 27: 'Breezy Point/Fort Tilden/Riis Beach', 28: 'Briarwood/Jamaica Hills', 29: 'Brighton Beach', 30: 'Broad Channel', 31: 'Bronx Park', 32: 'Bronxdale', 33: 'Brooklyn Heights', 34: 'Brooklyn Navy Yard', 35: 'Brownsville', 36: 'Bushwick North', 37: 'Bushwick South', 38: 'Cambria Heights', 39: 'Canarsie', 40: 'Carroll Gardens', 41: 'Central Harlem', 42: 'Central Harlem North', 43: 'Central Park', 44: 'Charleston/Tottenville', 45: 'Chinatown', 46: 'City Island', 47: 'Claremont/Bathgate', 48: 'Clinton East', 49: 'Clinton Hill', 50: 'Clinton West', 51: 'Co-Op City', 52: 'Cobble Hill', 53: 'College Point', 54: 'Columbia Street', 55: 'Coney Island', 56: 'Corona', 57: 'Corona', 58: 'Country Club', 59: 'Crotona Park', 60: 'Crotona Park East', 61: 'Crown Heights North', 62: 'Crown Heights South', 63: 'Cypress Hills', 64: 'Douglaston', 65: 'Downtown Brooklyn/MetroTech', 66: 'DUMBO/Vinegar Hill', 67: 'Dyker Heights', 68: 'East Chelsea', 69: 'East Concourse/Concourse Village', 70: 'East Elmhurst', 71: 'East Flatbush/Farragut', 72: 'East Flatbush/Remsen Village', 73: 'East Flushing', 74: 'East Harlem North', 75: 'East Harlem South', 76: 'East New York', 77: 'East New York/Pennsylvania Avenue', 78: 'East Tremont', 79: 'East Village', 80: 'East Williamsburg', 81: 'Eastchester', 82: 'Elmhurst', 83: 'Elmhurst/Maspeth', 84: "Eltingville/Annadale/Prince's Bay", 85: 'Erasmus', 86: 'Far Rockaway', 87: 'Financial District North', 88: 'Financial District South', 89: 'Flatbush/Ditmas Park', 90: 'Flatiron', 91: 'Flatlands', 92: 'Flushing', 93: 'Flushing Meadows-Corona Park', 94: 'Fordham South', 95: 'Forest Hills', 96: 'Forest Park/Highland Park', 97: 'Fort Greene', 98: 'Fresh Meadows', 99: 'Freshkills Park', 100: 'Garment District', 101: 'Glen Oaks', 102: 'Glendale', 103: "Governor's Island/Ellis Island/Liberty Island", 104: "Governor's Island/Ellis Island/Liberty Island", 105: "Governor's Island/Ellis Island/Liberty Island", 106: 'Gowanus', 107: 'Gramercy', 108: 'Gravesend', 109: 'Great Kills', 110: 'Great Kills Park', 111: 'Green-Wood Cemetery', 112: 'Greenpoint', 113: 'Greenwich Village North', 114: 'Greenwich Village South', 115: 'Grymes Hill/Clifton', 116: 'Hamilton Heights', 117: 'Hammels/Arverne', 118: 'Heartland Village/Todt Hill', 119: 'Highbridge', 120: 'Highbridge Park', 121: 'Hillcrest/Pomonok', 122: 'Hollis', 123: 'Homecrest', 124: 'Howard Beach', 125: 'Hudson Sq', 126: 'Hunts Point', 127: 'Inwood', 128: 'Inwood Hill Park', 129: 'Jackson Heights', 130: 'Jamaica', 131: 'Jamaica Estates', 132: 'JFK Airport', 133: 'Kensington', 134: 'Kew Gardens', 135: 'Kew Gardens Hills', 136: 'Kingsbridge Heights', 137: 'Kips Bay', 138: 'LaGuardia Airport', 139: 'Laurelton', 140: 'Lenox Hill East', 141: 'Lenox Hill West', 142: 'Lincoln Square East', 143: 'Lincoln Square West', 144: 'Little Italy/NoLiTa', 145: 'Long Island City/Hunters Point', 146: 'Long Island City/Queens Plaza', 147: 'Longwood', 148: 'Lower East Side', 149: 'Madison', 150: 'Manhattan Beach', 151: 'Manhattan Valley', 152: 'Manhattanville', 153: 'Marble Hill', 154: 'Marine Park/Floyd Bennett Field', 155: 'Marine Park/Mill Basin', 156: 'Mariners Harbor', 157: 'Maspeth', 158: 'Meatpacking/West Village West', 159: 'Melrose South', 160: 'Middle Village', 161: 'Midtown Center', 162: 'Midtown East', 163: 'Midtown North', 164: 'Midtown South', 165: 'Midwood', 166: 'Morningside Heights', 167: 'Morrisania/Melrose', 168: 'Mott Haven/Port Morris', 169: 'Mount Hope', 170: 'Murray Hill', 171: 'Murray Hill-Queens', 172: 'New Dorp/Midland Beach', 173: 'North Corona', 174: 'Norwood', 175: 'Oakland Gardens', 176: 'Oakwood', 177: 'Ocean Hill', 178: 'Ocean Parkway South', 179: 'Old Astoria', 180: 'Ozone Park', 181: 'Park Slope', 182: 'Parkchester', 183: 'Pelham Bay', 184: 'Pelham Bay Park', 185: 'Pelham Parkway', 186: 'Penn Station/Madison Sq West', 187: 'Port Richmond', 188: 'Prospect-Lefferts Gardens', 189: 'Prospect Heights', 190: 'Prospect Park', 191: 'Queens Village', 192: 'Queensboro Hill', 193: 'Queensbridge/Ravenswood', 194: 'Randalls Island', 195: 'Red Hook', 196: 'Rego Park', 197: 'Richmond Hill', 198: 'Ridgewood', 199: 'Rikers Island', 200: 'Riverdale/North Riverdale/Fieldston', 201: 'Rockaway Park', 202: 'Roosevelt Island', 203: 'Rosedale', 204: 'Rossville/Woodrow', 205: 'Saint Albans', 206: 'Saint George/New Brighton', 207: 'Saint Michaels Cemetery/Woodside', 208: 'Schuylerville/Edgewater Park', 209: 'Seaport', 210: 'Sheepshead Bay', 211: 'SoHo', 212: 'Soundview/Bruckner', 213: 'Soundview/Castle Hill', 214: 'South Beach/Dongan Hills', 215: 'South Jamaica', 216: 'South Ozone Park', 217: 'South Williamsburg', 218: 'Springfield Gardens North', 219: 'Springfield Gardens South', 220: 'Spuyten Duyvil/Kingsbridge', 221: 'Stapleton', 222: 'Starrett City', 223: 'Steinway', 224: 'Stuy Town/Peter Cooper Village', 225: 'Stuyvesant Heights', 226: 'Sunnyside', 227: 'Sunset Park East', 228: 'Sunset Park West', 229: 'Sutton Place/Turtle Bay North', 230: 'Times Sq/Theatre District', 231: 'TriBeCa/Civic Center', 232: 'Two Bridges/Seward Park', 233: 'UN/Turtle Bay South', 234: 'Union Sq', 235: 'University Heights/Morris Heights', 236: 'Upper East Side North', 237: 'Upper East Side South', 238: 'Upper West Side North', 239: 'Upper West Side South', 240: 'Van Cortlandt Park', 241: 'Van Cortlandt Village', 242: 'Van Nest/Morris Park', 243: 'Washington Heights North', 244: 'Washington Heights South', 245: 'West Brighton', 246: 'West Chelsea/Hudson Yards', 247: 'West Concourse', 248: 'West Farms/Bronx River', 249: 'West Village', 250: 'Westchester Village/Unionport', 251: 'Westerleigh', 252: 'Whitestone', 253: 'Willets Point', 254: 'Williamsbridge/Olinville', 255: 'Williamsburg (North Side)', 256: 'Williamsburg (South Side)', 257: 'Windsor Terrace', 258: 'Woodhaven', 259: 'Woodlawn/Wakefield', 260: 'Woodside', 261: 'World Trade Center', 262: 'Yorkville East', 263: 'Yorkville West', 264: 'NA', 265: 'NA'}
payment_dict = {1: "Credit card", 2: "Cash", 3: "No charge", 4: "Dispute", 5: "Unknown", 6: "Voided trip"}
display(df_2017.columns)
# Expand pickup datetime to multiple columns
df_2017["year"] = df_2017['tpep_pickup_datetime'].dt.year
df_2017["month"] = df_2017['tpep_pickup_datetime'].dt.month
df_2017["dayofmonth"] = df_2017['tpep_pickup_datetime'].dt.day
df_2017["dayofweek"] = df_2017['tpep_pickup_datetime'].dt.dayofweek # Monday is 0
df_2017["hour"] = df_2017['tpep_pickup_datetime'].dt.hour
# Calc trip duration
df_2017['trip_dur_secs'] = (df_2017['tpep_dropoff_datetime'] - df_2017['tpep_pickup_datetime']).dt.seconds
# Replace location ids with location name
df_2017["DOLocation"] = df_2017["DOLocationID"].map(location_dic)
df_2017["PULocation"] = df_2017["PULocationID"].map(location_dic)
# Replace payment type id with payment type name
df_2017["payment_type"] = df_2017["payment_type"].map(payment_dict)
# Drop unused columns
df_2017 = df_2017.drop(["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime",
"RatecodeID", "store_and_fwd_flag", 'PULocationID', 'DOLocationID',
"extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "fare_amount"], axis = 1)
df_2017.head()
Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
'total_amount'],
dtype='object')
| passenger_count | trip_distance | payment_type | total_amount | year | month | dayofmonth | dayofweek | hour | trip_dur_secs | DOLocation | PULocation | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 1.0 | 3.30 | Credit card | 15.30 | 2017 | 1 | 9 | 0 | 11 | 737 | Midtown Center | Yorkville West |
| 1 | 1.0 | 0.90 | Credit card | 7.25 | 2017 | 1 | 9 | 0 | 11 | 214 | Union Sq | Penn Station/Madison Sq West |
| 2 | 1.0 | 1.10 | Credit card | 7.30 | 2017 | 1 | 9 | 0 | 11 | 225 | Midtown Center | Midtown South |
| 3 | 1.0 | 1.10 | Credit card | 8.50 | 2017 | 1 | 9 | 0 | 11 | 323 | East Harlem South | Upper East Side North |
| 4 | 1.0 | 0.02 | Cash | 52.80 | 2017 | 1 | 1 | 6 | 0 | 0 | Union Sq | West Village |
display(df_2018.columns)
# Expand pickup datetime to multiple columns
df_2018["year"] = df_2018['tpep_pickup_datetime'].dt.year
df_2018["month"] = df_2018['tpep_pickup_datetime'].dt.month
df_2018["dayofmonth"] = df_2018['tpep_pickup_datetime'].dt.day
df_2018["dayofweek"] = df_2018['tpep_pickup_datetime'].dt.dayofweek # Monday is 0
df_2018["hour"] = df_2018['tpep_pickup_datetime'].dt.hour
# Calc trip duration
df_2018['trip_dur_secs'] = (df_2018['tpep_dropoff_datetime'] - df_2018['tpep_pickup_datetime']).dt.seconds
# Replace location ids with location name
df_2018["DOLocation"] = df_2018["DOLocationID"].map(location_dic)
df_2018["PULocation"] = df_2018["PULocationID"].map(location_dic)
# Replace payment type id with payment type name
df_2018["payment_type"] = df_2018["payment_type"].map(payment_dict)
# Drop unused columns
df_2018= df_2018.drop(["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime",
"RatecodeID", "store_and_fwd_flag", 'PULocationID', 'DOLocationID',
"extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "fare_amount"], axis = 1)
df_2018.head()
Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
'total_amount'],
dtype='object')
| passenger_count | trip_distance | payment_type | total_amount | year | month | dayofmonth | dayofweek | hour | trip_dur_secs | DOLocation | PULocation | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 1.0 | 0.5 | Cash | 5.80 | 2018 | 1 | 1 | 0 | 0 | 198 | Bloomingdale | Central Harlem |
| 1 | 1.0 | 2.7 | Cash | 15.30 | 2018 | 1 | 1 | 0 | 0 | 1090 | Lenox Hill East | Upper West Side South |
| 2 | 2.0 | 0.8 | Credit card | 8.30 | 2018 | 1 | 1 | 0 | 0 | 355 | Lenox Hill West | Yorkville East |
| 3 | 1.0 | 10.2 | Cash | 34.80 | 2018 | 1 | 1 | 0 | 0 | 1949 | Windsor Terrace | Lenox Hill East |
| 4 | 2.0 | 2.5 | Credit card | 16.55 | 2018 | 1 | 1 | 0 | 0 | 1068 | Upper West Side South | West Chelsea/Hudson Yards |
display(df_2019.columns)
# Expand pickup datetime to multiple columns
df_2019["year"] = df_2019['tpep_pickup_datetime'].dt.year
df_2019["month"] = df_2019['tpep_pickup_datetime'].dt.month
df_2019["dayofmonth"] = df_2019['tpep_pickup_datetime'].dt.day
df_2019["dayofweek"] = df_2019['tpep_pickup_datetime'].dt.dayofweek # Monday is 0
df_2019["hour"] = df_2019['tpep_pickup_datetime'].dt.hour
# Calc trip duration
df_2019['trip_dur_secs'] = (df_2019['tpep_dropoff_datetime'] - df_2019['tpep_pickup_datetime']).dt.seconds
# Replace location ids with location name
df_2019["DOLocation"] = df_2019["DOLocationID"].map(location_dic)
df_2019["PULocation"] = df_2019["PULocationID"].map(location_dic)
# Replace payment type id with payment type name
df_2019["payment_type"] = df_2019["payment_type"].map(payment_dict)
# Drop unused columns
df_2019= df_2019.drop(["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime",
"RatecodeID", "store_and_fwd_flag", 'PULocationID', 'DOLocationID',
"extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "fare_amount", "congestion_surcharge"], axis = 1)
df_2019.head()
Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
'total_amount', 'congestion_surcharge'],
dtype='object')
| passenger_count | trip_distance | payment_type | total_amount | year | month | dayofmonth | dayofweek | hour | trip_dur_secs | DOLocation | PULocation | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 1.0 | 1.5 | Credit card | 9.95 | 2019 | 1 | 1 | 1 | 0 | 400 | Upper West Side South | Manhattan Valley |
| 1 | 1.0 | 2.6 | Credit card | 16.30 | 2019 | 1 | 1 | 1 | 0 | 1152 | West Chelsea/Hudson Yards | Upper West Side South |
| 2 | 3.0 | 0.0 | Credit card | 5.80 | 2018 | 12 | 21 | 4 | 13 | 250 | Upper East Side North | Upper East Side North |
| 3 | 5.0 | 0.0 | Cash | 7.55 | 2018 | 11 | 28 | 2 | 15 | 200 | Queensbridge/Ravenswood | Queensbridge/Ravenswood |
| 4 | 5.0 | 0.0 | Cash | 55.55 | 2018 | 11 | 28 | 2 | 15 | 96 | Queensbridge/Ravenswood | Queensbridge/Ravenswood |
display(df_2020.columns)
# Expand pickup datetime to multiple columns
df_2020["year"] = df_2020['tpep_pickup_datetime'].dt.year
df_2020["month"] = df_2020['tpep_pickup_datetime'].dt.month
df_2020["dayofmonth"] = df_2020['tpep_pickup_datetime'].dt.day
df_2020["dayofweek"] = df_2020['tpep_pickup_datetime'].dt.dayofweek # Monday is 0
df_2020["hour"] = df_2020['tpep_pickup_datetime'].dt.hour
# Calc trip duration
df_2020['trip_dur_secs'] = (df_2020['tpep_dropoff_datetime'] - df_2020['tpep_pickup_datetime']).dt.seconds
# Replace location ids with location name
df_2020["DOLocation"] = df_2020["DOLocationID"].map(location_dic)
df_2020["PULocation"] = df_2020["PULocationID"].map(location_dic)
# Replace payment type id with payment type name
df_2020["payment_type"] = df_2020["payment_type"].map(payment_dict)
# Drop unused columns
df_2020= df_2020.drop(["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime",
"RatecodeID", "store_and_fwd_flag", 'PULocationID', 'DOLocationID',
"extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "fare_amount", "congestion_surcharge"], axis = 1)
df_2020.head()
Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
'total_amount', 'congestion_surcharge'],
dtype='object')
| passenger_count | trip_distance | payment_type | total_amount | year | month | dayofmonth | dayofweek | hour | trip_dur_secs | DOLocation | PULocation | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 1.0 | 1.2 | Credit card | 11.27 | 2020 | 1 | 1 | 2 | 0 | 288 | Upper West Side South | Upper West Side North |
| 1 | 1.0 | 1.2 | Credit card | 12.30 | 2020 | 1 | 1 | 2 | 0 | 445 | Upper West Side North | Upper West Side South |
| 2 | 1.0 | 0.6 | Credit card | 10.80 | 2020 | 1 | 1 | 2 | 0 | 371 | Upper West Side North | Upper West Side North |
| 3 | 1.0 | 0.8 | Credit card | 8.16 | 2020 | 1 | 1 | 2 | 0 | 291 | Manhattan Valley | Upper West Side North |
| 4 | 1.0 | 0.0 | Cash | 4.80 | 2020 | 1 | 1 | 2 | 0 | 138 | Queensbridge/Ravenswood | Queensbridge/Ravenswood |
df = dd.concat([df_2017, df_2018, df_2019, df_2020])
df = df[df["year"].between(2017, 2020)]
from dask.distributed import Client, progress
df_persisted = client.persist(df)
df_persisted.head()
| passenger_count | trip_distance | payment_type | total_amount | year | month | dayofmonth | dayofweek | hour | trip_dur_secs | DOLocation | PULocation | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 1.0 | 3.30 | Credit card | 15.30 | 2017 | 1 | 9 | 0 | 11 | 737 | Midtown Center | Yorkville West |
| 1 | 1.0 | 0.90 | Credit card | 7.25 | 2017 | 1 | 9 | 0 | 11 | 214 | Union Sq | Penn Station/Madison Sq West |
| 2 | 1.0 | 1.10 | Credit card | 7.30 | 2017 | 1 | 9 | 0 | 11 | 225 | Midtown Center | Midtown South |
| 3 | 1.0 | 1.10 | Credit card | 8.50 | 2017 | 1 | 9 | 0 | 11 | 323 | East Harlem South | Upper East Side North |
| 4 | 1.0 | 0.02 | Cash | 52.80 | 2017 | 1 | 1 | 6 | 0 | 0 | Union Sq | West Village |
df_persisted.shape[0].compute()
325346861
len(df_persisted.columns)
12
%%time
df_mean_psngr_pickup_date = df_persisted.groupby(["year", "month"]).passenger_count.mean().compute()
CPU times: user 408 ms, sys: 8.64 ms, total: 416 ms Wall time: 2min 39s
import plotly.graph_objects as go
import calendar
fig = go.Figure(go.Scatter(
mode = "lines+markers",
y = [*df_mean_psngr_pickup_date.sort_index()],
x = [f"{calendar.month_name[i[1]]}, {i[0]}" for i in df_mean_psngr_pickup_date.sort_index().index]))
fig.show()
Note la periodicidad de los datos. Pese a la tendencia aparentemente negativa de la cantidad de pasajeros por viaje, existen picos en Diciembre. Podemos seguramente atribuir esto a las festividades de la epoca del año.
Identifique también el comportamiento al inicio del 2020. La pandemia puede verse como un choque drástico al sistema y por lo tanto podemos empezar a pensar que debemos tratar estos datos con precaución.
%%time
df_mean_total_amount_date = df_persisted.groupby(["year", "month"]).total_amount.mean().compute()
CPU times: user 443 ms, sys: 2.08 ms, total: 445 ms Wall time: 2min 48s
fig = go.Figure(go.Scatter(
mode = "lines+markers",
y = [*df_mean_total_amount_date.sort_index()],
x = [f"{calendar.month_name[i[1]]}, {i[0]}" for i in df_mean_total_amount_date.sort_index().index]))
fig.show()
%%time
df_trips_by_pickup_date = df_persisted.groupby(["year", "month"]).passenger_count.count().compute()
CPU times: user 216 ms, sys: 4.34 ms, total: 220 ms Wall time: 1min 49s
fig = go.Figure(go.Scatter(
mode = "lines+markers",
y = [*df_trips_by_pickup_date.sort_index()],
x = [f"{calendar.month_name[i[1]]}, {i[0]}" for i in df_trips_by_pickup_date.sort_index().index]))
fig.show()
import plotly.express as px
frecuencias = df_persisted["passenger_count"].value_counts().reset_index().compute()
frecuencias.columns = ["passenger_count", 'Conteo']
display(frecuencias)
frecuencias = frecuencias[frecuencias["passenger_count"] < 10]
fig = px.bar(frecuencias, x = "passenger_count", y = "Conteo")
fig.show()
| passenger_count | Conteo | |
|---|---|---|
| 0 | 1.0 | 230725025 |
| 1 | 2.0 | 47768212 |
| 2 | 5.0 | 14159985 |
| 3 | 3.0 | 13500418 |
| 4 | 6.0 | 8632883 |
| 5 | 4.0 | 6386743 |
| 6 | 0.0 | 3114330 |
| 7 | 7.0 | 1257 |
| 8 | 8.0 | 987 |
| 9 | 9.0 | 849 |
| 10 | 192.0 | 2 |
| 11 | 96.0 | 1 |
frecuencias = df_persisted["payment_type"].value_counts().reset_index().compute()
frecuencias.columns = ["payment_type", 'Conteo']
display(frecuencias)
#frecuencias = frecuencias[frecuencias["payment_type"] < 10]
fig = px.bar(frecuencias, x = "payment_type", y = "Conteo")
fig.show()
| payment_type | Conteo | |
|---|---|---|
| 0 | Credit card | 225697131 |
| 1 | Cash | 96274913 |
| 2 | No charge | 1722319 |
| 3 | Dispute | 596275 |
| 4 | Unknown | 54 |
frecuencias = df_persisted["DOLocation"].value_counts().reset_index().compute()
frecuencias.columns = ["DOLocation", 'Conteo']
display(frecuencias)
#frecuencias = frecuencias[frecuencias["payment_type"] < 10]
fig = px.bar(frecuencias, x = "DOLocation", y = "Conteo")
fig.show()
| DOLocation | Conteo | |
|---|---|---|
| 0 | Upper East Side North | 12686526 |
| 1 | Midtown Center | 12206259 |
| 2 | Upper East Side South | 11901808 |
| 3 | Murray Hill | 10283074 |
| 4 | Times Sq/Theatre District | 9867711 |
| ... | ... | ... |
| 256 | Freshkills Park | 347 |
| 257 | Governor's Island/Ellis Island/Liberty Island | 297 |
| 258 | Jamaica Bay | 271 |
| 259 | Rikers Island | 61 |
| 260 | Great Kills Park | 13 |
261 rows × 2 columns
frecuencias = df_persisted["PULocation"].value_counts().reset_index().compute()
frecuencias.columns = ["PULocation", 'Conteo']
display(frecuencias)
#frecuencias = frecuencias[frecuencias["payment_type"] < 10]
fig = px.bar(frecuencias, x = "PULocation", y = "Conteo")
fig.show()
| PULocation | Conteo | |
|---|---|---|
| 0 | Upper East Side South | 13486927 |
| 1 | Midtown Center | 12622781 |
| 2 | Upper East Side North | 12266504 |
| 3 | Penn Station/Madison Sq West | 11353992 |
| 4 | Midtown East | 11350909 |
| ... | ... | ... |
| 256 | Rossville/Woodrow | 153 |
| 257 | Freshkills Park | 147 |
| 258 | Oakwood | 140 |
| 259 | Rikers Island | 70 |
| 260 | Great Kills Park | 9 |
261 rows × 2 columns
%%time
grouped_df = df_persisted.groupby(df_persisted.passenger_count).trip_distance.mean().compute()
grouped_df
CPU times: user 312 ms, sys: 3.66 ms, total: 315 ms Wall time: 2min 22s
passenger_count 0.0 2.768555 1.0 2.883428 2.0 3.091694 3.0 3.038092 4.0 3.111862 5.0 3.003721 6.0 2.989049 7.0 3.283246 8.0 5.079797 9.0 5.679011 192.0 1.010000 96.0 0.830000 Name: trip_distance, dtype: float64
%%time
max_trip_dist = df_persisted.trip_distance.max().compute()
max_trip_dist
CPU times: user 457 ms, sys: 12.4 ms, total: 470 ms Wall time: 1min 41s
350914.89
df_persisted.isna().sum().compute()
passenger_count 1056169 trip_distance 0 payment_type 1056169 total_amount 0 year 0 month 0 dayofmonth 0 dayofweek 0 hour 0 trip_dur_secs 0 DOLocation 0 PULocation 0 dtype: int64
##Selecting top 10 rides based on fare amount
most_paid_rides_dask = df_persisted[['PULocation', 'total_amount']].nlargest(10, "total_amount")
##Visualizing most paid rides through Barplot
import matplotlib.pyplot as plt
most_paid_rides_dask.set_index('PULocation',sorted=True).compute().plot(kind='barh',stacked=False, figsize=[10,8], legend=True)
#######
plt.title('Viajes mejor pagos')
plt.xlabel('Total Amount')
plt.ylabel('PU Location')
plt.show()
##Visualizing trip distance through Barplot
import matplotlib.pyplot as plt
most_paid_rides_dask2 = df_persisted[['trip_distance', 'total_amount']].nlargest(10, "trip_distance")
most_paid_rides_dask2.set_index('trip_distance',sorted=True).compute().plot(kind='bar', colormap='PiYG', stacked=False, figsize=[10,8], legend=True)
#######
plt.title('Fares by Distance')
plt.xlabel('Trip Distance')
plt.ylabel('Total Amount')
plt.show()
#librería para prueba t
from scipy.stats import ttest_ind
#Calcular p valor de la F
import scipy.stats as stats
#Backend alternativo para paralelizar procesos en nodos del cluster
import joblib
descriptive_df = df_persisted[["passenger_count", "trip_distance", "total_amount", "trip_dur_secs"]].describe().compute()
descriptive_df
| passenger_count | trip_distance | total_amount | trip_dur_secs | |
|---|---|---|---|---|
| count | 3.242907e+08 | 3.253469e+08 | 3.253469e+08 | 3.253469e+08 |
| mean | 1.587871e+00 | 2.994856e+00 | 1.724392e+01 | 1.036081e+03 |
| std | 1.232861e+00 | 9.023515e+01 | 1.934956e+02 | 3.909878e+03 |
| min | 0.000000e+00 | -3.726453e+04 | -1.871800e+03 | 0.000000e+00 |
| 25% | 1.000000e+00 | 1.130000e+00 | 1.130000e+01 | 5.150000e+02 |
| 50% | 1.000000e+00 | 2.240000e+00 | 1.580000e+01 | 8.680000e+02 |
| 75% | 2.000000e+00 | 9.540000e+00 | 4.300000e+01 | 2.220000e+03 |
| max | 1.920000e+02 | 3.509149e+05 | 1.084772e+06 | 8.639900e+04 |
Vamos a analizar la diferencia entre la media de la tarifa entre los años 2017 y 2019.
import math
from scipy.stats import t
df_persisted_17 = client.persist(df_persisted[df_persisted["year"] == 2017]["total_amount"])
df_persisted_19 = client.persist(df_persisted[df_persisted["year"] == 2019]["total_amount"])
mean1, mean2 = df_persisted_17.mean().compute(), df_persisted_19.mean().compute()
std1, std2 = df_persisted_17.std().compute(), df_persisted_19.std().compute()
n1, n2 = df_persisted_17.shape[0].compute(), df_persisted_17.shape[0].compute()
se1, se2 = std1/math.sqrt(n1), std2/math.sqrt(n2)
sed = math.sqrt(se1**2.0 + se2**2.0)
t_stat = (mean1 - mean2) / sed
df = n1 + n2 - 2
alpha = 0.05
cv = t.ppf(1.0 - alpha, df)
# calculate the p-value
p = (1 - t.cdf(abs(t_stat), df)) * 2
display('Valor de t: '+str(t_stat))
display('Valor p: '+ str(p))
'Valor de t: -111.84969194168677'
'Valor p: 0.0'
import pandas as pd
# Crear la estructura de tabla ANOVA
data_col = [['SSA', '', '', '', '', ''], ['SSE', '', '', '', '', ''], ['Total', '', '', '', '', '']]
anova_tabla = pd.DataFrame(data_col, columns = ['Fuente de variación', 'SS', 'gdl', 'MS', 'F', 'P-valor'])
anova_tabla.set_index('Fuente de variación', inplace = True)
df_persisted_grouped = df_persisted.groupby('payment_type')
pmt_type_count = df_persisted_grouped.count().compute()
# Calcular la SSA y actualizamos la tabla
promedio_tarifa = df_persisted['total_amount'].mean().compute()
SSA = pmt_type_count * (df_persisted_grouped.mean().compute() - promedio_tarifa)**2
anova_tabla.at['SSA', 'SS'] = SSA['total_amount'].sum()
# Calculas la SSE y actualizar en la tabla
SSE = (pmt_type_count - 1) * df_persisted_grouped.std()**2
anova_tabla.at['SSE', 'SS'] = SSE['total_amount'].sum()
# Calcular SST y actualizar en la tabla
SST = SSA['total_amount'].sum() + SSE['total_amount'].sum()
anova_tabla.at['Total', 'SS']= SST
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-28-4b6a79e86c95> in <module> 15 16 # Calculas la SSE y actualizar en la tabla ---> 17 SSE = (pmt_type_count - 1) * df_persisted_grouped.std()**2 18 anova_tabla.at['SSE', 'SS'] = SSE['total_amount'].sum() 19 ~/anaconda3/envs/daskpy36/lib/python3.6/site-packages/pandas/core/ops/__init__.py in f(self, other, axis, level, fill_value) 693 def f(self, other, axis=default_axis, level=None, fill_value=None): 694 --> 695 other = _align_method_FRAME(self, other, axis) 696 697 if isinstance(other, ABCDataFrame): ~/anaconda3/envs/daskpy36/lib/python3.6/site-packages/pandas/core/ops/__init__.py in _align_method_FRAME(left, right, axis) 671 elif is_list_like(right) and not isinstance(right, (ABCSeries, ABCDataFrame)): 672 # GH17901 --> 673 right = to_series(right) 674 675 return right ~/anaconda3/envs/daskpy36/lib/python3.6/site-packages/pandas/core/ops/__init__.py in to_series(right) 634 if len(left.columns) != len(right): 635 raise ValueError( --> 636 msg.format(req_len=len(left.columns), given_len=len(right)) 637 ) 638 right = left._constructor_sliced(right, index=left.columns) ValueError: Unable to coerce to Series, length must be 11: given 5
This section will demonstrate how to perform regression modeling using Scikit learn on Distributed Dask back-end. We will continue to the Newyork taxi trips dataset but now predict the duration of the trip using linear regression.
Many Scikit-Learn algorithms are written for parallel execution using Joblib, which natively provides thread-based and process-based parallelism. Joblib is what backs the n_jobs= parameter in normal use of Scikit-Learn. Dask can scale these Joblib-backed algorithms out to a cluster of machines by providing an alternative Joblib backend.
dfl = dd.read_csv(
's3://nyc-tlc/trip data/green_tripdata_2018-02.csv', storage_options={'anon': True},
parse_dates=['lpep_pickup_datetime', 'lpep_dropoff_datetime'],
).sample(frac=0.8, replace=True)
dfl['trip_duration'] = dfl['lpep_dropoff_datetime'] - dfl['lpep_pickup_datetime']
import numpy as np
dfl['trip_duration'] = dfl['trip_duration']/np.timedelta64(1,'D')
dfl['trip_duration'] = dfl['trip_duration'] * 24
dfl['trip_duration']
len(dfl)
dfl.head()
dfl = dfl.fillna(value=0)
dfl = dd.get_dummies(dfl.categorize()).compute()
dfl.head()
x = dfl[['VendorID','RatecodeID','PULocationID','DOLocationID','passenger_count','trip_distance','fare_amount','total_amount']]
y = dfl['trip_duration']
from dask_ml.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(x, y, random_state=1)
len(X_train), len(X_test), len(y_train), len(y_test)
training_x = X_train.values
training_y = y_train.values
testing_x = X_test.values
testing_y = y_test.values
import numpy as np
from sklearn.metrics import mean_squared_error
def rmse(preds, actuals):
error = mean_squared_error(actuals, preds)
rmse = np.sqrt(error)
print(rmse)
import joblib
from dask_ml.linear_model import LinearRegression
with joblib.parallel_backend('dask'):
lr = LinearRegression(random_state=1, fit_intercept=True)
lr.fit(training_x,training_y)
preds = lr.predict(testing_x)
preds
!sudo aws ecs update-service --service Dask-Workers --desired-count 1 --cluster Fargate-Dask-Cluster